In this lab, we will introduce the conception of Markov Decision Process(MDP) and two solution algorithms, and then we will introduce the Q-Learning and SARSA algorithm, finally we will use the Q-learning algorithm to train an agent to play "Flappy Bird" game.
A Markov decision process (MDP) is a random process, i.e. a sequence of random states S1, S2, ... with the Markov property. It provides a mathematical framework for modeling decision making in situations where outcomes are partly random and partly under the control of a decision maker.
There are two algorithms to solve the MDP problem: value iteration and policy iteration.
The algorithm is like below:
Next, we will show an example code of Gridworld to see how value iteration works.
please install gym.
commands: pip install gym
import numpy as np
import sys
from gym.envs.toy_text import discrete
# four actions in the game
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class GridworldEnv(discrete.DiscreteEnv):
"""
Grid World environment from Sutton's Reinforcement Learning book chapter 4.
You are an agent on an MxN grid and your goal is to reach the terminal
state at the top left or the bottom right corner.
For example, a 4x4 grid looks as follows:
T o o o
o x o o
o o o o
o o o T
x is your position and T are the two terminal states.
You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
Actions going off the edge leave you in your current state.
You receive a reward of -1 at each step until you reach a terminal state.
"""
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, shape=[4, 4]):
if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
raise ValueError('shape argument must be a list/tuple of length 2')
self.shape = shape
nS = np.prod(shape)
nA = 4
MAX_Y = shape[0]
MAX_X = shape[1]
P = {}
grid = np.arange(nS).reshape(shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
P[s] = {a: [] for a in range(nA)}
is_done = lambda s: s == 0 or s == (nS - 1)
reward = 0.0 if is_done(s) else -1.0
# We're stuck in a terminal state
if is_done(s):
P[s][UP] = [(1.0, s, reward, True)]
P[s][RIGHT] = [(1.0, s, reward, True)]
P[s][DOWN] = [(1.0, s, reward, True)]
P[s][LEFT] = [(1.0, s, reward, True)]
# Not a terminal state
else:
ns_up = s if y == 0 else s - MAX_X
ns_right = s if x == (MAX_X - 1) else s + 1
ns_down = s if y == (MAX_Y - 1) else s + MAX_X
ns_left = s if x == 0 else s - 1
P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]
it.iternext()
# Initial state distribution is uniform
isd = np.ones(nS) / nS
# We expose the model of the environment for educational purposes
# This should not be used in any model-free learning algorithm
self.P = P
super(GridworldEnv, self).__init__(nS, nA, P, isd)
def render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
grid = np.arange(self.nS).reshape(self.shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
if self.s == s:
output = " x "
elif s == 0 or s == self.nS - 1:
output = " T "
else:
output = " o "
if x == 0:
output = output.lstrip()
if x == self.shape[1] - 1:
output = output.rstrip()
outfile.write(output)
if x == self.shape[1] - 1:
outfile.write("\n")
it.iternext()
env = GridworldEnv()
def value_iteration(env, theta=0.0001, discount_factor=1.0):
"""
Value Iteration Algorithm.
Args:
env: OpenAI env. env.P represents the transition probabilities of the environment.
env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
env.nS is a number of states in the environment.
env.nA is a number of actions in the environment.
theta: We stop evaluation once our value function change is less than theta for all states.
discount_factor: Gamma discount factor.
Returns:
A tuple (policy, V) of the optimal policy and the optimal value function.
"""
def one_step_lookahead(state, V):
"""
Given an state, calculate the new value function V(s) based on the value iteration algorithm
Args:
state: represents each state in the Gridworld, an integer
V: the current value function of the states(V(s)), the lengh is env.nS
Returns:
a new V(s)
"""
A = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
A[a] += prob * (reward + discount_factor * V[next_state])
return A
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
# Do a one-step lookahead to find the best action
A = one_step_lookahead(s, V)
best_action_value = np.max(A)
# Calculate delta across all states seen so far
delta = max(delta, np.abs(best_action_value - V[s]))
# Update the value function
V[s] = best_action_value
# Check if we can stop
if delta < theta:
break
# Create a deterministic policy using the optimal value function
policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
# One step lookahead to find the best action for this state
A = one_step_lookahead(s, V)
best_action = np.argmax(A)
# Always take the best action
policy[s, best_action] = 1.0
return policy, V
policy, v = value_iteration(env)
print("Policy Probability Distribution:")
print(policy)
print("")
print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")
print("Value Function:")
print(v)
print("")
print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")
The algorithm is like below:
Next, we will show an example code of Gridworld to see how value iteration works.
import numpy as np
import sys
from gym.envs.toy_text import discrete
env = GridworldEnv()
def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
"""
Evaluate a policy given an environment and a full description of the environment's dynamics.
Args:
policy: [S, A] shaped matrix representing the policy.
env: OpenAI env. env.P represents the transition probabilities of the environment.
env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
env.nS is a number of states in the environment.
env.nA is a number of actions in the environment.
theta: We stop evaluation once our value function change is less than theta for all states.
discount_factor: Gamma discount factor.
Returns:
Vector of length env.nS representing the value function.
"""
# Start with a random (all 0) value function
V = np.zeros(env.nS)
while True:
delta = 0
# For each state, perform a "full backup"
for s in range(env.nS):
v = 0
# Look at the possible next actions
for a, action_prob in enumerate(policy[s]):
# For each action, look at the possible next states...
for prob, next_state, reward, done in env.P[s][a]:
# Calculate the expected value
v += action_prob * prob * (reward + discount_factor * V[next_state])
# How much our value function changed (across any states)
delta = max(delta, np.abs(v - V[s]))
V[s] = v
# Stop evaluating once our value function change is below a threshold
if delta < theta:
break
return np.array(V)
def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
"""
Policy Improvement Algorithm. Iteratively evaluates and improves a policy
until an optimal policy is found.
Args:
env: The OpenAI environment.
policy_eval_fn: Policy Evaluation function that takes 3 arguments:
policy, env, discount_factor.
discount_factor: gamma discount factor.
Returns:
A tuple (policy, V).
policy is the optimal policy, a matrix of shape [S, A] where each state s
contains a valid probability distribution over actions.
V is the value function for the optimal policy.
"""
def one_step_lookahead(state, V):
"""
Helper function to calculate the value for all action in a given state.
Args:
state: The state to consider (int)
V: The value to use as an estimator, Vector of length env.nS
Returns:
A vector of length env.nA containing the expected value of each action.
"""
A = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
A[a] += prob * (reward + discount_factor * V[next_state])
return A
# Start with a random policy
policy = np.ones([env.nS, env.nA]) / env.nA
while True:
# Evaluate the current policy
V = policy_eval_fn(policy, env, discount_factor)
# Will be set to false if we make any changes to the policy
policy_stable = True
# For each state...
for s in range(env.nS):
# The best action we would take under the current policy
chosen_a = np.argmax(policy[s])
# Find the best action by one-step lookahead
# Ties are resolved arbitarily
action_values = one_step_lookahead(s, V)
best_a = np.argmax(action_values)
# Greedily update the policy
if chosen_a != best_a:
policy_stable = False
policy[s] = np.eye(env.nA)[best_a]
# If the policy is stable we've found an optimal policy. Return it
if policy_stable:
return policy, V
policy, v = policy_improvement(env)
print("Policy Probability Distribution:")
print(policy)
print("")
print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")
print("Value Function:")
print(v)
print("")
print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")
Q-Learning is an off-policy, model-free RL algorithm.
SARSA is an on-policy, model-free RL algorithm.
Flappybird is a side-scrolling game where the agent must successfully nagivate through gaps between pipes.
Next, we wiil train an agent to play "Flappy Bird" game using Q-learning algorithm.

First, we should install PyGame Learning Environment(PLE) which provides the environment to train an agent.
command: git clone https://github.com/ntasfi/PyGame-Learning-Environment
command: pip install -e .(Please don't ignore this period)
command: pip install pygame==1.9.6
cautions: PyGame-Learning-Environment is based on pygame 1.x.x. Please do not install pygame 2.0.0 which is just released two months ago, you will definitely get errors.
Now, we can train our agent to play the game.
It is not necessary to create the code file in the in the PyGame-Learning-Environment folder, you could create the code file wherever you want.
from ple.games.flappybird import FlappyBird
from ple import PLE
import matplotlib.pyplot as plt
import os
import numpy as np
%matplotlib inline
os.environ["SDL_VIDEODRIVER"] = "dummy" # this line disable pop-out window
game = FlappyBird()
env = PLE(game, fps=30, display_screen=False) # environment interface to game
env.reset_game()
# return a dictionary whose key is action description and value is action index
print(game.actions)
# return a list of action index (include None)
print(env.getActionSet())
# a dictionary describe state
'''
player y position.
players velocity.
next pipe distance to player
next pipe top y position
next pipe bottom y position
next next pipe distance to player
next next pipe top y position
next next pipe bottom y position
'''
game.getGameState()
import math
import copy
from collections import defaultdict
MIN_EXPLORING_RATE = 0.01
MIN_LEARNING_RATE = 0.5
class Agent:
def __init__(self,
bucket_range_per_feature,
num_action,
t=0,
discount_factor=0.99):
self.update_parameters(t) # init explore rate and learning rate
self.q_table = defaultdict(lambda: np.zeros(num_action))
self.discount_factor = discount_factor
self.num_action = num_action
# how to discretize each feature in a state
# the higher each value, less time to train but with worser performance
# e.g. if range = 2, feature with value 1 is equal to feature with value 0 bacause int(1/2) = int(0/2)
self.bucket_range_per_feature = bucket_range_per_feature
def select_action(self, state):
# epsilon-greedy
state_idx = self.get_state_idx(state)
if np.random.rand() < self.exploring_rate:
action = np.random.choice(num_action) # Select a random action
else:
action = np.argmax(
self.q_table[state_idx]) # Select the action with the highest q
return action
def update_policy(self, state, action, reward, state_prime):
state_idx = self.get_state_idx(state)
state_prime_idx = self.get_state_idx(state_prime)
# Update Q_value using Q-learning update rule
best_q = np.max(self.q_table[state_prime_idx])
self.q_table[state_idx][action] += self.learning_rate * (
reward + self.discount_factor * best_q - self.q_table[state_idx][action])
def get_state_idx(self, state):
# instead of using absolute position of pipe, use relative position
state = copy.deepcopy(state)
state['next_next_pipe_bottom_y'] -= state['player_y']
state['next_next_pipe_top_y'] -= state['player_y']
state['next_pipe_bottom_y'] -= state['player_y']
state['next_pipe_top_y'] -= state['player_y']
# sort to make list converted from dict ordered in alphabet order
state_key = [k for k, v in sorted(state.items())]
# do bucketing to decrease state space to speed up training
state_idx = []
for key in state_key:
state_idx.append(
int(state[key] / self.bucket_range_per_feature[key]))
return tuple(state_idx)
def update_parameters(self, episode):
self.exploring_rate = max(MIN_EXPLORING_RATE,
min(0.5, 0.99**((episode) / 30)))
self.learning_rate = max(MIN_LEARNING_RATE, min(0.5, 0.99
** ((episode) / 30)))
def shutdown_explore(self):
# make action selection greedy
self.exploring_rate = 0
num_action = len(env.getActionSet())
bucket_range_per_feature = {
'next_next_pipe_bottom_y': 40,
'next_next_pipe_dist_to_player': 512,
'next_next_pipe_top_y': 40,
'next_pipe_bottom_y': 20,
'next_pipe_dist_to_player': 20,
'next_pipe_top_y': 20,
'player_vel': 4,
'player_y': 16
}
# init agent
agent = Agent(bucket_range_per_feature, num_action)
import moviepy.editor as mpy
def make_anim(images, fps=60, true_image=False):
duration = len(images) / fps
def make_frame(t):
try:
x = images[int(len(images) / duration * t)]
except:
x = images[-1]
if true_image:
return x.astype(np.uint8)
else:
return ((x + 1) / 2 * 255).astype(np.uint8)
clip = mpy.VideoClip(make_frame, duration=duration)
clip.fps = fps
return clip
from IPython.display import Image, display
reward_per_epoch = []
lifetime_per_epoch = []
exploring_rates = []
learning_rates = []
print_every_episode = 500
show_gif_every_episode = 5000
NUM_EPISODE = 40000
for episode in range(0, NUM_EPISODE):
# Reset the environment
env.reset_game()
# record frame
frames = [env.getScreenRGB()]
# for every 500 episodes, shutdown exploration to see performance of greedy action
if episode % print_every_episode == 0:
agent.shutdown_explore()
# the initial state
state = game.getGameState()
# cumulate reward for this episode
cum_reward = 0
t = 0
while not env.game_over():
# select an action
action = agent.select_action(state)
# execute the action and get reward
# reward = +1 when pass a pipe, -5 when die
reward = env.act(env.getActionSet()[action])
frames.append(env.getScreenRGB())
# cumulate reward
cum_reward += reward
# observe the result
state_prime = game.getGameState() # get next state
# update agent
agent.update_policy(state, action, reward, state_prime)
# Setting up for the next iteration
state = state_prime
t += 1
# update exploring_rate and learning_rate
agent.update_parameters(episode)
if episode % print_every_episode == 0:
print("Episode {} finished after {} time steps, cumulated reward: {}, exploring rate: {}, learning rate: {}".format(
episode,
t,
cum_reward,
agent.exploring_rate,
agent.learning_rate
))
reward_per_epoch.append(cum_reward)
exploring_rates.append(agent.exploring_rate)
learning_rates.append(agent.learning_rate)
lifetime_per_epoch.append(t)
# for every 5000 episode, record an animation
if episode % show_gif_every_episode == 0:
print("len frames:", len(frames))
clip = make_anim(frames, fps=60, true_image=True).rotate(-90)
display(clip.ipython_display(fps=60, autoplay=1, loop=1))
def demo():
# Reset the environment
env.reset_game()
# record frame
frames = [env.getScreenRGB()]
# shutdown exploration to see performance of greedy action
agent.shutdown_explore()
# the initial state
state = game.getGameState()
while not env.game_over():
# select an action
action = agent.select_action(state)
# execute the action and get reward
reward = env.act(env.getActionSet()[action])
frames.append(env.getScreenRGB())
# observe the result
state_prime = game.getGameState() # get next state
# Setting up for the next iteration
state = state_prime
clip = make_anim(frames, fps=60, true_image=True).rotate(-90)
display(clip.ipython_display(fps=60, autoplay=1, loop=1))
demo()
# plot life time against training episodes
fig, ax1 = plt.subplots(figsize=(20, 5))
plt.plot(range(len(lifetime_per_epoch)), lifetime_per_epoch)
fig.tight_layout()
plt.show()
# plot reward against training episodes
fig, ax1 = plt.subplots(figsize=(20, 5))
plt.plot(range(len(reward_per_epoch)), reward_per_epoch)
plt.show()
Write a brief report in the notebook
Upload both ipynb and html to google drive
Share your drive's link via ILMS
Deadline: 2020-12-24(Thur) 23:59.